package com.itcast.hadoop.flowsort;/**
 * Created by Administrator on 2019/4/3 0003.
 */

import com.itcast.hadoop.flowsum.FlowBean;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author ydf
 * @com kt
 * @create 2019-04-03 下午 5:33
 **/
public class SortMR {
    public static class SortMapper extends Mapper<LongWritable,Text,FlowBean,NullWritable>{

        /*
        *拿到一行数据,切分出各字段,封装一个flowbean。作为key输出
        *FlowBean继承了比较方法
        *13480253104     180     200     380
        *13502468823     102     7335    7437
        */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line=value.toString();
            String[] fields= StringUtils.split(line,"\t");
            String phoneNB=fields[0];
            long u_flow=Long.parseLong(fields[1]);
            long d_flow=Long.parseLong(fields[2]);
            context.write(new FlowBean(phoneNB,u_flow,d_flow),NullWritable.get());
        }
    }

    public static class SortReducer extends Reducer<FlowBean,NullWritable,Text,FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

            String phoneNB=key.getPhoneNB();
            context.write(new Text(phoneNB),key);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //创建Configuration
        Configuration configuration = new Configuration();

        //创建Job
        Job job = Job.getInstance(configuration);

        //设置job的处理类
        job.setJarByClass(SortMR.class);

        //设置map相关参数
        job.setMapperClass(SortMapper.class);
        //设置reduce相关参数
        job.setReducerClass(SortReducer.class);

        //指定mapper数据的输出kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //指定reducer数据的输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设置作业处理的输入路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //设置作业处理的输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
